// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Threading;
using Microsoft.Win32.SafeHandles;

namespace System.Net.Sockets
{
    // Note on asynchronous behavior here:

    // The asynchronous socket operations here generally do the following:
    // (1) If the operation queue is Ready (queue is empty), try to perform the operation immediately, non-blocking.
    // If this completes (i.e. does not return EWOULDBLOCK), then we return the results immediately
    // for both success (SocketError.Success) or failure.
    // No callback will happen; callers are expected to handle these synchronous completions themselves.
    // (2) If EWOULDBLOCK is returned, or the queue is not empty, then we enqueue an operation to the
    // appropriate queue and return SocketError.IOPending.
    // Enqueuing itself may fail because the socket is closed before the operation can be enqueued;
    // in this case, we return SocketError.OperationAborted (which matches what Winsock would return in this case).
    // (3) When we receive an epoll notification for the socket, we post a work item to the threadpool
    // to perform the I/O and invoke the callback with the I/O result.

    // Synchronous operations generally do the same, except that instead of returning IOPending,
    // they block on an event handle until the operation is processed by the queue.

    // See comments on OperationQueue below for more details of how the queue coordination works.

    internal sealed partial class SocketAsyncContext
    {
        // Cached operation instances for operations commonly repeated on the same socket instance,
        // e.g. async accepts, sends/receives with single and multiple buffers.  More can be
        // added in the future if necessary, at the expense of extra fields here.  With a larger
        // refactoring, these could also potentially be moved to SocketAsyncEventArgs, which
        // would be more invasive but which would allow them to be reused across socket instances
        // and also eliminate the interlocked necessary to rent the instances.
        private AcceptOperation? _cachedAcceptOperation;
        private BufferMemoryReceiveOperation? _cachedBufferMemoryReceiveOperation;
        private BufferListReceiveOperation? _cachedBufferListReceiveOperation;
        private BufferMemorySendOperation? _cachedBufferMemorySendOperation;
        private BufferListSendOperation? _cachedBufferListSendOperation;

        private void ReturnOperation(AcceptOperation operation)
        {
            operation.Reset();
            operation.Callback = null;
            operation.SocketAddress = default;
            Volatile.Write(ref _cachedAcceptOperation, operation); // benign race condition
        }

        private void ReturnOperation(BufferMemoryReceiveOperation operation)
        {
            operation.Reset();
            operation.Buffer = default;
            operation.Callback = null;
            operation.SocketAddress = default;
            Volatile.Write(ref _cachedBufferMemoryReceiveOperation, operation); // benign race condition
        }

        private void ReturnOperation(BufferListReceiveOperation operation)
        {
            operation.Reset();
            operation.Buffers = null;
            operation.Callback = null;
            operation.SocketAddress = default;
            Volatile.Write(ref _cachedBufferListReceiveOperation, operation); // benign race condition
        }

        private void ReturnOperation(BufferMemorySendOperation operation)
        {
            operation.Reset();
            operation.Buffer = default;
            operation.Callback = null;
            operation.SocketAddress = default;
            Volatile.Write(ref _cachedBufferMemorySendOperation, operation); // benign race condition
        }

        private void ReturnOperation(BufferListSendOperation operation)
        {
            operation.Reset();
            operation.Buffers = null;
            operation.Callback = null;
            operation.SocketAddress = default;
            Volatile.Write(ref _cachedBufferListSendOperation, operation); // benign race condition
        }

        private AcceptOperation RentAcceptOperation() =>
            Interlocked.Exchange(ref _cachedAcceptOperation, null) ??
            new AcceptOperation(this);

        private BufferMemoryReceiveOperation RentBufferMemoryReceiveOperation() =>
            Interlocked.Exchange(ref _cachedBufferMemoryReceiveOperation, null) ??
            new BufferMemoryReceiveOperation(this);

        private BufferListReceiveOperation RentBufferListReceiveOperation() =>
            Interlocked.Exchange(ref _cachedBufferListReceiveOperation, null) ??
            new BufferListReceiveOperation(this);

        private BufferMemorySendOperation RentBufferMemorySendOperation() =>
            Interlocked.Exchange(ref _cachedBufferMemorySendOperation, null) ??
            new BufferMemorySendOperation(this);

        private BufferListSendOperation RentBufferListSendOperation() =>
            Interlocked.Exchange(ref _cachedBufferListSendOperation, null) ??
            new BufferListSendOperation(this);

        private abstract class AsyncOperation : IThreadPoolWorkItem
        {
            private enum State
            {
                Waiting = 0,
                Running,
                RunningWithPendingCancellation,
                Complete,
                Canceled
            }

            private volatile AsyncOperation.State _state;

#if DEBUG
            private bool _callbackQueued; // When true, the callback has been queued.
#endif

            public readonly SocketAsyncContext AssociatedContext;
            public AsyncOperation Next = null!; // initialized by helper called from ctor
            public SocketError ErrorCode;
            public Memory<byte> SocketAddress;
            public CancellationTokenRegistration CancellationRegistration;

            public ManualResetEventSlim? Event { get; set; }

            public AsyncOperation(SocketAsyncContext context)
            {
                AssociatedContext = context;
                Reset();
            }

            public void Reset()
            {
                _state = State.Waiting;
                Event = null;
                Next = this;
#if DEBUG
                _callbackQueued = false;
#endif
            }

            public OperationResult TryComplete(SocketAsyncContext context)
            {
                TraceWithContext(context, "Enter");

                // Set state to Running, unless we've been canceled
                State oldState = Interlocked.CompareExchange(ref _state, State.Running, State.Waiting);
                if (oldState == State.Canceled)
                {
                    TraceWithContext(context, "Exit, Previously canceled");
                    return OperationResult.Cancelled;
                }

                Debug.Assert(oldState == State.Waiting, $"Unexpected operation state: {(State)oldState}");

                // Try to perform the IO
                if (DoTryComplete(context))
                {
                    Debug.Assert(_state is State.Running or State.RunningWithPendingCancellation, "Unexpected operation state");

                    _state = State.Complete;

                    TraceWithContext(context, "Exit, Completed");
                    return OperationResult.Completed;
                }

                // Set state back to Waiting, unless we were canceled, in which case we have to process cancellation now
                State newState;
                while (true)
                {
                    State state = _state;
                    Debug.Assert(state is State.Running or State.RunningWithPendingCancellation, $"Unexpected operation state: {(State)state}");

                    newState = (state == State.Running ? State.Waiting : State.Canceled);
                    if (state == Interlocked.CompareExchange(ref _state, newState, state))
                    {
                        break;
                    }

                    // Race to update the state. Loop and try again.
                }

                if (newState == State.Canceled)
                {
                    ProcessCancellation();
                    TraceWithContext(context, "Exit, Newly cancelled");
                    return OperationResult.Cancelled;
                }

                TraceWithContext(context, "Exit, Pending");
                return OperationResult.Pending;
            }

            public bool TryCancel()
            {
                Trace("Enter");

                // Note we could be cancelling because of socket close. Regardless, we don't need the registration anymore.
                CancellationRegistration.Dispose();

                State newState;
                while (true)
                {
                    State state = _state;
                    if (state is State.Complete or State.Canceled or State.RunningWithPendingCancellation)
                    {
                        return false;
                    }

                    newState = (state == State.Waiting ? State.Canceled : State.RunningWithPendingCancellation);
                    if (state == Interlocked.CompareExchange(ref _state, newState, state))
                    {
                        break;
                    }

                    // Race to update the state. Loop and try again.
                }

                if (newState == State.RunningWithPendingCancellation)
                {
                    // TryComplete will either succeed, or it will see the pending cancellation and deal with it.
                    return false;
                }

                ProcessCancellation();

                // Note, we leave the operation in the OperationQueue.
                // When we get around to processing it, we'll see it's cancelled and skip it.
                return true;
            }

            public void ProcessCancellation()
            {
                Trace("Enter");

                Debug.Assert(_state == State.Canceled);

                ErrorCode = SocketError.OperationAborted;

                ManualResetEventSlim? e = Event;
                if (e != null)
                {
                    e.Set();
                }
                else
                {
#if DEBUG
                    Debug.Assert(!Interlocked.Exchange(ref _callbackQueued, true), $"Unexpected _callbackQueued: {_callbackQueued}");
#endif
                    // We've marked the operation as canceled, and so should invoke the callback, but
                    // we can't pool the object, as ProcessQueue may still have a reference to it, due to
                    // using a pattern whereby it takes the lock to grab an item, but then releases the lock
                    // to do further processing on the item that's still in the list.
                    ThreadPool.UnsafeQueueUserWorkItem(o => ((AsyncOperation)o!).InvokeCallback(allowPooling: false), this);
                }
            }

            public void Dispatch()
            {
                ManualResetEventSlim? e = Event;
                if (e != null)
                {
                    // Sync operation.  Signal waiting thread to continue processing.
                    e.Set();
                }
                else
                {
                    // Async operation.
                    Schedule();
                }
            }

            public void Schedule()
            {
                Debug.Assert(Event == null);

                // Async operation.  Process the IO on the threadpool.
                ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false);
            }

            public void Process() => ((IThreadPoolWorkItem)this).Execute();

            void IThreadPoolWorkItem.Execute()
            {
                // ReadOperation and WriteOperation, the only two types derived from
                // AsyncOperation, implement IThreadPoolWorkItem.Execute to call
                // ProcessAsyncOperation(this) on the appropriate receive or send queue.
                // However, this base class needs to be able to queue them without
                // additional allocation, so it also implements the interface in order
                // to pass the compiler's static checking for the interface, but then
                // when the runtime queries for the interface, it'll use the derived
                // type's interface implementation.  We could instead just make this
                // an abstract and have the derived types override it, but that adds
                // "Execute" as a public method, which could easily be misunderstood.
                // We could also add an abstract method that the base interface implementation
                // invokes, but that adds an extra virtual dispatch.
                Debug.Fail("Expected derived type to implement IThreadPoolWorkItem");
                throw new InvalidOperationException();
            }

            // Called when op is not in the queue yet, so can't be otherwise executing
            public void DoAbort()
            {
                ErrorCode = SocketError.OperationAborted;
            }

            protected abstract bool DoTryComplete(SocketAsyncContext context);

            public abstract void InvokeCallback(bool allowPooling);

            [Conditional("SOCKETASYNCCONTEXT_TRACE")]
            public void Trace(string message, [CallerMemberName] string? memberName = null)
            {
                OutputTrace($"{IdOf(this)}.{memberName}: {message}");
            }

            [Conditional("SOCKETASYNCCONTEXT_TRACE")]
            public void TraceWithContext(SocketAsyncContext context, string message, [CallerMemberName] string? memberName = null)
            {
                OutputTrace($"{IdOf(context)}, {IdOf(this)}.{memberName}: {message}");
            }
        }

        // These two abstract classes differentiate the operations that go in the
        // read queue vs the ones that go in the write queue.
        private abstract class ReadOperation : AsyncOperation, IThreadPoolWorkItem
        {
            public ReadOperation(SocketAsyncContext context) : base(context) { }

            void IThreadPoolWorkItem.Execute() => AssociatedContext.ProcessAsyncReadOperation(this);
        }

        private abstract class WriteOperation : AsyncOperation, IThreadPoolWorkItem
        {
            public WriteOperation(SocketAsyncContext context) : base(context) { }

            void IThreadPoolWorkItem.Execute() => AssociatedContext.ProcessAsyncWriteOperation(this);
        }

        private abstract class SendOperation : WriteOperation
        {
            public SocketFlags Flags;
            public int BytesTransferred;
            public int Offset;
            public int Count;

            public SendOperation(SocketAsyncContext context) : base(context) { }

            public Action<int, Memory<byte>, SocketFlags, SocketError>? Callback { get; set; }

            public override void InvokeCallback(bool allowPooling) =>
                Callback!(BytesTransferred, SocketAddress, SocketFlags.None, ErrorCode);
        }

        private class BufferMemorySendOperation : SendOperation
        {
            public Memory<byte> Buffer;

            public BufferMemorySendOperation(SocketAsyncContext context) : base(context) { }

            protected override bool DoTryComplete(SocketAsyncContext context)
            {
                int bufferIndex = 0;
                return SocketPal.TryCompleteSendTo(context._socket, Buffer.Span, null, ref bufferIndex, ref Offset, ref Count, Flags, SocketAddress.Span, ref BytesTransferred, out ErrorCode);
            }

            public override void InvokeCallback(bool allowPooling)
            {
                var cb = Callback!;
                int bt = BytesTransferred;
                Memory<byte> sa = SocketAddress;
                SocketError ec = ErrorCode;

                if (allowPooling)
                {
                    AssociatedContext.ReturnOperation(this);
                }

                cb(bt, sa, SocketFlags.None, ec);
            }
        }

        private sealed class BufferListSendOperation : SendOperation
        {
            public IList<ArraySegment<byte>>? Buffers;
            public int BufferIndex;

            public BufferListSendOperation(SocketAsyncContext context) : base(context) { }

            protected override bool DoTryComplete(SocketAsyncContext context)
            {
                return SocketPal.TryCompleteSendTo(context._socket, default(ReadOnlySpan<byte>), Buffers, ref BufferIndex, ref Offset, ref Count, Flags, SocketAddress.Span, ref BytesTransferred, out ErrorCode);
            }

            public override void InvokeCallback(bool allowPooling)
            {
                var cb = Callback!;
                int bt = BytesTransferred;
                Memory<byte> sa = SocketAddress;
                SocketError ec = ErrorCode;

                if (allowPooling)
                {
                    AssociatedContext.ReturnOperation(this);
                }

                cb(bt, sa, SocketFlags.None, ec);
            }
        }

        private sealed unsafe class BufferPtrSendOperation : SendOperation
        {
            public byte* BufferPtr;

            public BufferPtrSendOperation(SocketAsyncContext context) : base(context) { }

            protected override bool DoTryComplete(SocketAsyncContext context)
            {
                int bufferIndex = 0;
                int bufferLength = Offset + Count; // TryCompleteSendTo expects the entire buffer, which it then indexes into with the ref Offset and ref Count arguments
                return SocketPal.TryCompleteSendTo(context._socket, new ReadOnlySpan<byte>(BufferPtr, bufferLength), null, ref bufferIndex, ref Offset, ref Count, Flags, SocketAddress.Span, ref BytesTransferred, out ErrorCode);
            }
        }

        private abstract class ReceiveOperation : ReadOperation
        {
            public SocketFlags Flags;
            public SocketFlags ReceivedFlags;
            public int BytesTransferred;

            public ReceiveOperation(SocketAsyncContext context) : base(context) { }

            public Action<int, Memory<byte>, SocketFlags, SocketError>? Callback { get; set; }

            public override void InvokeCallback(bool allowPooling) =>
                Callback!(BytesTransferred, SocketAddress, ReceivedFlags, ErrorCode);
        }

        private sealed class BufferMemoryReceiveOperation : ReceiveOperation
        {
            public Memory<byte> Buffer;
            public bool SetReceivedFlags;

            public BufferMemoryReceiveOperation(SocketAsyncContext context) : base(context) { }

            protected override bool DoTryComplete(SocketAsyncContext context)
            {
                // Zero byte read is performed to know when data is available.
                // We don't have to call receive, our caller is interested in the event.
                if (Buffer.Length == 0 && Flags == SocketFlags.None && SocketAddress.Length == 0)
                {
                    BytesTransferred = 0;
                    ReceivedFlags = SocketFlags.None;
                    ErrorCode = SocketError.Success;
                    return true;
                }
                else
                {
                    if (!SetReceivedFlags)
                    {
                        Debug.Assert(SocketAddress.Length == 0);

                        ReceivedFlags = SocketFlags.None;
                        return SocketPal.TryCompleteReceive(context._socket, Buffer.Span, Flags, out BytesTransferred, out ErrorCode);
                    }
                    else
                    {
                        bool completed = SocketPal.TryCompleteReceiveFrom(context._socket, Buffer.Span, null, Flags, SocketAddress.Span, out int socketAddressLen, out BytesTransferred, out ReceivedFlags, out ErrorCode);
                        if (completed && ErrorCode == SocketError.Success)
                        {
                            SocketAddress = SocketAddress.Slice(0, socketAddressLen);
                        }
                        return completed;
                    }
                }
            }

            public override void InvokeCallback(bool allowPooling)
            {
                var cb = Callback!;
                int bt = BytesTransferred;
                Memory<byte> sa = SocketAddress;
                SocketFlags rf = ReceivedFlags;
                SocketError ec = ErrorCode;

                if (allowPooling)
                {
                    AssociatedContext.ReturnOperation(this);
                }

                cb(bt, sa, rf, ec);
            }
        }

        private sealed class BufferListReceiveOperation : ReceiveOperation
        {
            public IList<ArraySegment<byte>>? Buffers;

            public BufferListReceiveOperation(SocketAsyncContext context) : base(context) { }

            protected override bool DoTryComplete(SocketAsyncContext context)
            {
                bool completed = SocketPal.TryCompleteReceiveFrom(context._socket, default(Span<byte>), Buffers, Flags, SocketAddress.Span, out int socketAddressLen, out BytesTransferred, out ReceivedFlags, out ErrorCode);
                if (completed && ErrorCode == SocketError.Success)
                {
                    SocketAddress = SocketAddress.Slice(0, socketAddressLen);
                }
                return completed;
            }

            public override void InvokeCallback(bool allowPooling)
            {
                var cb = Callback!;
                int bt = BytesTransferred;
                Memory<byte> sa = SocketAddress;
                SocketFlags rf = ReceivedFlags;
                SocketError ec = ErrorCode;

                if (allowPooling)
                {
                    AssociatedContext.ReturnOperation(this);
                }

                cb(bt, sa, rf, ec);
            }
        }

        private sealed unsafe class BufferPtrReceiveOperation : ReceiveOperation
        {
            public byte* BufferPtr;
            public int Length;

            public BufferPtrReceiveOperation(SocketAsyncContext context) : base(context) { }

            protected override bool DoTryComplete(SocketAsyncContext context)
            {
                bool completed = SocketPal.TryCompleteReceiveFrom(context._socket, new Span<byte>(BufferPtr, Length), null, Flags, SocketAddress.Span, out int socketAddressLen, out BytesTransferred, out ReceivedFlags, out ErrorCode);
                if (completed && ErrorCode == SocketError.Success)
                {
                    SocketAddress = SocketAddress.Slice(0, socketAddressLen);
                }
                return completed;
            }
        }

        private sealed class ReceiveMessageFromOperation : ReadOperation
        {
            public Memory<byte> Buffer;
            public SocketFlags Flags;
            public int BytesTransferred;
            public SocketFlags ReceivedFlags;
            public IList<ArraySegment<byte>>? Buffers;

            public bool IsIPv4;
            public bool IsIPv6;
            public IPPacketInformation IPPacketInformation;

            public ReceiveMessageFromOperation(SocketAsyncContext context) : base(context) { }

            public Action<int, Memory<byte>, SocketFlags, IPPacketInformation, SocketError>? Callback { get; set; }

            protected override bool DoTryComplete(SocketAsyncContext context)
            {
                bool completed = SocketPal.TryCompleteReceiveMessageFrom(context._socket, Buffer.Span, Buffers, Flags, SocketAddress, out int socketAddressLen, IsIPv4, IsIPv6, out BytesTransferred, out ReceivedFlags, out IPPacketInformation, out ErrorCode);
                if (completed && ErrorCode == SocketError.Success)
                {
                    SocketAddress = SocketAddress.Slice(0, socketAddressLen);
                }
                return completed;
            }

            public override void InvokeCallback(bool allowPooling) =>
                Callback!(BytesTransferred, SocketAddress, ReceivedFlags, IPPacketInformation, ErrorCode);
        }

        private sealed unsafe class BufferPtrReceiveMessageFromOperation : ReadOperation
        {
            public byte* BufferPtr;
            public int Length;
            public SocketFlags Flags;
            public int BytesTransferred;
            public SocketFlags ReceivedFlags;

            public bool IsIPv4;
            public bool IsIPv6;
            public IPPacketInformation IPPacketInformation;

            public BufferPtrReceiveMessageFromOperation(SocketAsyncContext context) : base(context) { }

            public Action<int, Memory<byte>, SocketFlags, IPPacketInformation, SocketError>? Callback { get; set; }

            protected override bool DoTryComplete(SocketAsyncContext context)
            {
                bool completed = SocketPal.TryCompleteReceiveMessageFrom(context._socket, new Span<byte>(BufferPtr, Length), null, Flags, SocketAddress!, out int socketAddressLen, IsIPv4, IsIPv6, out BytesTransferred, out ReceivedFlags, out IPPacketInformation, out ErrorCode);
                if (completed && ErrorCode == SocketError.Success)
                {
                    SocketAddress = SocketAddress.Slice(0, socketAddressLen);
                }
                return completed;
            }

            public override void InvokeCallback(bool allowPooling) =>
                Callback!(BytesTransferred, SocketAddress, ReceivedFlags, IPPacketInformation, ErrorCode);
        }

        private sealed class AcceptOperation : ReadOperation
        {
            public IntPtr AcceptedFileDescriptor;

            public AcceptOperation(SocketAsyncContext context) : base(context) { }

            public Action<IntPtr, Memory<byte>, SocketError>? Callback { get; set; }

            protected override bool DoTryComplete(SocketAsyncContext context)
            {
                bool completed = SocketPal.TryCompleteAccept(context._socket, SocketAddress, out int socketAddressLen, out AcceptedFileDescriptor, out ErrorCode);
                Debug.Assert(ErrorCode == SocketError.Success || AcceptedFileDescriptor == (IntPtr)(-1), $"Unexpected values: ErrorCode={ErrorCode}, AcceptedFileDescriptor={AcceptedFileDescriptor}");
                if (ErrorCode == SocketError.Success)
                {
                    SocketAddress = SocketAddress.Slice(0, socketAddressLen);
                }
                return completed;
            }

            public override void InvokeCallback(bool allowPooling)
            {
                var cb = Callback!;
                IntPtr fd = AcceptedFileDescriptor;
                Memory<byte> sa = SocketAddress;
                SocketError ec = ErrorCode;

                if (allowPooling)
                {
                    AssociatedContext.ReturnOperation(this);
                }

                cb(fd, sa, ec);
            }
        }

        private sealed class ConnectOperation : BufferMemorySendOperation
        {
            public ConnectOperation(SocketAsyncContext context) : base(context) { }

            protected override bool DoTryComplete(SocketAsyncContext context)
            {
                bool result = SocketPal.TryCompleteConnect(context._socket, out ErrorCode);
                context._socket.RegisterConnectResult(ErrorCode);

                if (result && ErrorCode == SocketError.Success &&  Buffer.Length > 0)
                {
                    SocketError error = context.SendToAsync(Buffer, 0, Buffer.Length, SocketFlags.None, Memory<byte>.Empty, ref BytesTransferred, Callback!, default);
                    if (error != SocketError.Success && error != SocketError.IOPending)
                    {
                        context._socket.RegisterConnectResult(ErrorCode);
                    }
                }
                return result;
            }

            public override void InvokeCallback(bool allowPooling)
            {
                var cb = Callback!;
                int bt = BytesTransferred;
                Memory<byte> sa = SocketAddress;
                SocketError ec = ErrorCode;
                Memory<byte> buffer = Buffer;

                if (buffer.Length == 0)
                {
                    // Invoke callback only when we are completely done.
                    // In case data were provided for Connect we may or may not send them all.
                    // If we did not we will need follow-up with Send operation
                    cb(bt, sa, SocketFlags.None, ec);
                }
            }
        }

        private sealed class SendFileOperation : WriteOperation
        {
            public SafeFileHandle FileHandle = null!; // always set when constructed
            public long Offset;
            public long Count;
            public long BytesTransferred;

            public SendFileOperation(SocketAsyncContext context) : base(context) { }

            public Action<long, SocketError>? Callback { get; set; }

            public override void InvokeCallback(bool allowPooling) =>
                Callback!(BytesTransferred, ErrorCode);

            protected override bool DoTryComplete(SocketAsyncContext context) =>
                SocketPal.TryCompleteSendFile(context._socket, FileHandle, ref Offset, ref Count, ref BytesTransferred, out ErrorCode);
        }

        // In debug builds, this struct guards against:
        // (1) Unexpected lock reentrancy, which should never happen
        // (2) Deadlock, by setting a reasonably large timeout
        private readonly struct LockToken : IDisposable
        {
            private readonly object _lockObject;

            public LockToken(object lockObject)
            {
                Debug.Assert(lockObject != null);

                _lockObject = lockObject;

                Debug.Assert(!Monitor.IsEntered(_lockObject));

#if DEBUG
                bool success = Monitor.TryEnter(_lockObject, 10000);
                Debug.Assert(success, "Timed out waiting for queue lock");
#else
                Monitor.Enter(_lockObject);
#endif
            }

            public void Dispose()
            {
                Debug.Assert(Monitor.IsEntered(_lockObject));
                Monitor.Exit(_lockObject);
            }
        }

        public enum OperationResult
        {
            Pending = 0,
            Completed = 1,
            Cancelled = 2
        }

        private struct OperationQueue<TOperation>
            where TOperation : AsyncOperation
        {
            // Quick overview:
            //
            // When attempting to perform an IO operation, the caller first checks IsReady,
            // and if true, attempts to perform the operation itself.
            // If this returns EWOULDBLOCK, or if the queue was not ready, then the operation
            // is enqueued by calling StartAsyncOperation and the state becomes Waiting.
            // When an epoll notification is received, we check if the state is Waiting,
            // and if so, change the state to Processing and enqueue a workitem to the threadpool
            // to try to perform the enqueued operations.
            // If an operation is successfully performed, we remove it from the queue,
            // enqueue another threadpool workitem to process the next item in the queue (if any),
            // and call the user's completion callback.
            // If we successfully process all enqueued operations, then the state becomes Ready;
            // otherwise, the state becomes Waiting and we wait for another epoll notification.

            private enum QueueState : int
            {
                Ready = 0,          // Indicates that data MAY be available on the socket.
                                    // Queue must be empty.
                Waiting = 1,        // Indicates that data is definitely not available on the socket.
                                    // Queue must not be empty.
                Processing = 2,     // Indicates that a thread pool item has been scheduled (and may
                                    // be executing) to process the IO operations in the queue.
                                    // Queue must not be empty.
                Stopped = 3,        // Indicates that the queue has been stopped because the
                                    // socket has been closed.
                                    // Queue must be empty.
            }

            // These fields define the queue state.

            private QueueState _state;      // See above
            private bool _isNextOperationSynchronous;
            private int _sequenceNumber;    // This sequence number is updated when we receive an epoll notification.
                                            // It allows us to detect when a new epoll notification has arrived
                                            // since the last time we checked the state of the queue.
                                            // If this happens, we MUST retry the operation, otherwise we risk
                                            // "losing" the notification and causing the operation to pend indefinitely.
            private AsyncOperation? _tail;   // Queue of pending IO operations to process when data becomes available.

            // The _queueLock is used to ensure atomic access to the queue state above.
            // The lock is only ever held briefly, to read and/or update queue state, and
            // never around any external call, e.g. OS call or user code invocation.
            private object _queueLock;

            private LockToken Lock() => new LockToken(_queueLock);

            public bool IsNextOperationSynchronous_Speculative => _isNextOperationSynchronous;

            public void Init()
            {
                Debug.Assert(_queueLock == null);
                _queueLock = new object();

                _state = QueueState.Ready;
                _sequenceNumber = 0;
            }

            // IsReady returns whether an operation can be executed immediately.
            // observedSequenceNumber must be passed to StartAsyncOperation.
            public bool IsReady(SocketAsyncContext context, out int observedSequenceNumber)
            {
                // It is safe to read _state and _sequence without using Lock.
                // - The return value is soley based on Volatile.Read of _state.
                // - The Volatile.Read of _sequenceNumber ensures we read a value before executing the operation.
                //   This is needed to retry the operation in StartAsyncOperation in case the _sequenceNumber incremented.
                // - Because no Lock is taken, it is possible we observe a sequence number increment before the state
                //   becomes Ready. When that happens, observedSequenceNumber is decremented, and StartAsyncOperation will
                //   execute the operation because the sequence number won't match.

                Debug.Assert(sizeof(QueueState) == sizeof(int));
                QueueState state = (QueueState)Volatile.Read(ref Unsafe.As<QueueState, int>(ref _state));
                observedSequenceNumber = Volatile.Read(ref _sequenceNumber);

                bool isReady = state == QueueState.Ready || state == QueueState.Stopped;
                if (!isReady)
                {
                    observedSequenceNumber--;
                }

                Trace(context, $"{isReady}");

                return isReady;
            }

            // Return true for pending, false for completed synchronously (including failure and abort)
            public bool StartAsyncOperation(SocketAsyncContext context, TOperation operation, int observedSequenceNumber, CancellationToken cancellationToken = default)
            {
                Trace(context, $"Enter");

                if (!context.IsRegistered && !context.TryRegister(out Interop.Error error))
                {
                    HandleFailedRegistration(context, operation, error);

                    Trace(context, "Leave, not registered");
                    return false;
                }

                while (true)
                {
                    bool doAbort = false;
                    using (Lock())
                    {
                        switch (_state)
                        {
                            case QueueState.Ready:
                                if (observedSequenceNumber != _sequenceNumber)
                                {
                                    // The queue has become ready again since we previously checked it.
                                    // So, we need to retry the operation before we enqueue it.
                                    Debug.Assert(observedSequenceNumber - _sequenceNumber < 10000, "Very large sequence number increase???");
                                    observedSequenceNumber = _sequenceNumber;
                                    break;
                                }

                                // Caller tried the operation and got an EWOULDBLOCK, so we need to transition.
                                _state = QueueState.Waiting;
                                goto case QueueState.Waiting;

                            case QueueState.Waiting:
                            case QueueState.Processing:
                                // Enqueue the operation.
                                Debug.Assert(operation.Next == operation, "Expected operation.Next == operation");

                                if (_tail == null)
                                {
                                    Debug.Assert(!_isNextOperationSynchronous);
                                    _isNextOperationSynchronous = operation.Event != null;
                                }
                                else
                                {
                                    operation.Next = _tail.Next;
                                    _tail.Next = operation;
                                }

                                _tail = operation;
                                Trace(context, $"Leave, enqueued {IdOf(operation)}");

                                // Now that the object is enqueued, hook up cancellation.
                                // Note that it's possible the call to register itself could
                                // call TryCancel, so we do this after the op is fully enqueued.
                                if (cancellationToken.CanBeCanceled)
                                {
                                    operation.CancellationRegistration = cancellationToken.UnsafeRegister(s => ((TOperation)s!).TryCancel(), operation);
                                }

                                return true;

                            case QueueState.Stopped:
                                Debug.Assert(_tail == null);
                                doAbort = true;
                                break;

                            default:
                                Environment.FailFast("unexpected queue state");
                                break;
                        }
                    }

                    if (doAbort)
                    {
                        operation.DoAbort();
                        Trace(context, $"Leave, queue stopped");
                        return false;
                    }

                    // Retry the operation.
                    if (operation.TryComplete(context) != OperationResult.Pending)
                    {
                        Trace(context, $"Leave, retry succeeded");
                        return false;
                    }
                }

                static void HandleFailedRegistration(SocketAsyncContext context, TOperation operation, Interop.Error error)
                {
                    Debug.Assert(error != Interop.Error.SUCCESS);

                    // macOS: kevent returns EPIPE when adding pipe fd for which the other end is closed.
                    if (error == Interop.Error.EPIPE)
                    {
                        // Because the other end close, we expect the operation to complete when we retry it.
                        // If it doesn't, we fall through and throw an Exception.
                        if (operation.TryComplete(context) != OperationResult.Pending)
                        {
                            return;
                        }
                    }

                    if (error == Interop.Error.ENOMEM || error == Interop.Error.ENOSPC)
                    {
                        throw new OutOfMemoryException();
                    }
                    else
                    {
                        throw new InternalException(error);
                    }
                }
            }

            public AsyncOperation? ProcessSyncEventOrGetAsyncEvent(SocketAsyncContext context, bool skipAsyncEvents = false)
            {
                AsyncOperation op;
                using (Lock())
                {
                    Trace(context, $"Enter");

                    switch (_state)
                    {
                        case QueueState.Ready:
                            Debug.Assert(_tail == null, "State == Ready but queue is not empty!");
                            _sequenceNumber++;
                            Trace(context, $"Exit (previously ready)");
                            return null;

                        case QueueState.Waiting:
                            Debug.Assert(_tail != null, "State == Waiting but queue is empty!");
                            op = _tail.Next;
                            Debug.Assert(_isNextOperationSynchronous == (op.Event != null));
                            if (skipAsyncEvents && !_isNextOperationSynchronous)
                            {
                                // Return the operation to indicate that the async operation was not processed, without making
                                // any state changes because async operations are being skipped
                                return op;
                            }

                            _state = QueueState.Processing;
                            // Break out and release lock
                            break;

                        case QueueState.Processing:
                            Debug.Assert(_tail != null, "State == Processing but queue is empty!");
                            _sequenceNumber++;
                            Trace(context, $"Exit (currently processing)");
                            return null;

                        case QueueState.Stopped:
                            Debug.Assert(_tail == null);
                            Trace(context, $"Exit (stopped)");
                            return null;

                        default:
                            Environment.FailFast("unexpected queue state");
                            return null;
                    }
                }

                ManualResetEventSlim? e = op.Event;
                if (e != null)
                {
                    // Sync operation.  Signal waiting thread to continue processing.
                    e.Set();
                    return null;
                }
                else
                {
                    // Async operation.  The caller will figure out how to process the IO.
                    Debug.Assert(!skipAsyncEvents);
                    return op;
                }
            }

            internal void ProcessAsyncOperation(TOperation op)
            {
                OperationResult result = ProcessQueuedOperation(op);

                Debug.Assert(op.Event == null, "Sync operation encountered in ProcessAsyncOperation");

                if (result == OperationResult.Completed)
                {
                    // At this point, the operation has completed and it's no longer
                    // in the queue / no one else has a reference to it.  We can invoke
                    // the callback and let it pool the object if appropriate. This is
                    // also a good time to unregister from cancellation; we must do
                    // so before the object is returned to the pool (or else a cancellation
                    // request for a previous operation could affect a subsequent one)
                    // and here we know the operation has completed.
                    op.CancellationRegistration.Dispose();
                    op.InvokeCallback(allowPooling: true);
                }
            }

            public OperationResult ProcessQueuedOperation(TOperation op)
            {
                SocketAsyncContext context = op.AssociatedContext;

                int observedSequenceNumber;
                using (Lock())
                {
                    Trace(context, $"Enter");

                    if (_state == QueueState.Stopped)
                    {
                        Debug.Assert(_tail == null);
                        Trace(context, $"Exit (stopped)");
                        return OperationResult.Cancelled;
                    }
                    else
                    {
                        Debug.Assert(_state == QueueState.Processing, $"_state={_state} while processing queue!");
                        Debug.Assert(_tail != null, "Unexpected empty queue while processing I/O");
                        Debug.Assert(op == _tail.Next, "Operation is not at head of queue???");
                        observedSequenceNumber = _sequenceNumber;
                    }
                }

                OperationResult result;
                while (true)
                {
                    result = op.TryComplete(context);
                    if (result != OperationResult.Pending)
                    {
                        break;
                    }

                    // Check for retry and reset queue state.

                    using (Lock())
                    {
                        if (_state == QueueState.Stopped)
                        {
                            Debug.Assert(_tail == null);
                            Trace(context, $"Exit (stopped)");
                            return OperationResult.Cancelled;
                        }
                        else
                        {
                            Debug.Assert(_state == QueueState.Processing, $"_state={_state} while processing queue!");

                            if (observedSequenceNumber != _sequenceNumber)
                            {
                                // We received another epoll notification since we previously checked it.
                                // So, we need to retry the operation.
                                Debug.Assert(observedSequenceNumber - _sequenceNumber < 10000, "Very large sequence number increase???");
                                observedSequenceNumber = _sequenceNumber;
                            }
                            else
                            {
                                _state = QueueState.Waiting;
                                Trace(context, $"Exit (received EAGAIN)");
                                return OperationResult.Pending;
                            }
                        }
                    }
                }

                // Remove the op from the queue and see if there's more to process.

                AsyncOperation? nextOp = null;
                using (Lock())
                {
                    if (_state == QueueState.Stopped)
                    {
                        Debug.Assert(_tail == null);
                        Trace(context, $"Exit (stopped)");
                    }
                    else
                    {
                        Debug.Assert(_state == QueueState.Processing, $"_state={_state} while processing queue!");
                        Debug.Assert(_tail.Next == op, "Queue modified while processing queue");

                        if (op == _tail)
                        {
                            // No more operations to process
                            _tail = null;
                            _isNextOperationSynchronous = false;
                            _state = QueueState.Ready;
                            _sequenceNumber++;
                            Trace(context, $"Exit (finished queue)");
                        }
                        else
                        {
                            // Pop current operation and advance to next
                            nextOp = _tail.Next = op.Next;
                            _isNextOperationSynchronous = nextOp.Event != null;
                        }
                    }
                }

                nextOp?.Dispatch();

                Debug.Assert(result != OperationResult.Pending);
                return result;
            }

            public void CancelAndContinueProcessing(TOperation op)
            {
                // Note, only sync operations use this method.
                Debug.Assert(op.Event != null);

                // Remove operation from queue.
                // Note it must be there since it can only be processed and removed by the caller.
                AsyncOperation? nextOp = null;
                using (Lock())
                {
                    if (_state == QueueState.Stopped)
                    {
                        Debug.Assert(_tail == null);
                    }
                    else
                    {
                        Debug.Assert(_tail != null, "Unexpected empty queue in CancelAndContinueProcessing");

                        if (_tail.Next == op)
                        {
                            // We're the head of the queue
                            if (op == _tail)
                            {
                                // No more operations
                                _tail = null;
                                _isNextOperationSynchronous = false;
                            }
                            else
                            {
                                // Pop current operation and advance to next
                                _tail.Next = op.Next;
                                _isNextOperationSynchronous = op.Next.Event != null;
                            }

                            // We're the first op in the queue.
                            if (_state == QueueState.Processing)
                            {
                                // The queue has already handed off execution responsibility to us.
                                // We need to dispatch to the next op.
                                if (_tail == null)
                                {
                                    _state = QueueState.Ready;
                                    _sequenceNumber++;
                                }
                                else
                                {
                                    nextOp = _tail.Next;
                                }
                            }
                            else if (_state == QueueState.Waiting)
                            {
                                if (_tail == null)
                                {
                                    _state = QueueState.Ready;
                                    _sequenceNumber++;
                                }
                            }
                        }
                        else
                        {
                            // We're not the head of the queue.
                            // Just find this op and remove it.
                            AsyncOperation current = _tail.Next;
                            while (current.Next != op)
                            {
                                current = current.Next;
                            }

                            if (current.Next == _tail)
                            {
                                _tail = current;
                            }
                            current.Next = current.Next.Next;
                        }
                    }
                }

                nextOp?.Dispatch();
            }

            // Called when the socket is closed.
            public bool StopAndAbort(SocketAsyncContext context)
            {
                bool aborted = false;

                // We should be called exactly once, by SafeSocketHandle.
                Debug.Assert(_state != QueueState.Stopped);

                using (Lock())
                {
                    Trace(context, $"Enter");

                    Debug.Assert(_state != QueueState.Stopped);

                    _state = QueueState.Stopped;

                    if (_tail != null)
                    {
                        AsyncOperation op = _tail;
                        do
                        {
                            aborted |= op.TryCancel();
                            op = op.Next;
                        } while (op != _tail);
                    }

                    _tail = null;
                    _isNextOperationSynchronous = false;

                    Trace(context, $"Exit");
                }

                return aborted;
            }

            [Conditional("SOCKETASYNCCONTEXT_TRACE")]
            public void Trace(SocketAsyncContext context, string message, [CallerMemberName] string? memberName = null)
            {
                string queueType =
                    typeof(TOperation) == typeof(ReadOperation) ? "recv" :
                    typeof(TOperation) == typeof(WriteOperation) ? "send" :
                    "???";

                OutputTrace($"{IdOf(context)}-{queueType}.{memberName}: {message}, {_state}-{_sequenceNumber}, {((_tail == null) ? "empty" : "not empty")}");
            }
        }

        internal readonly SafeSocketHandle _socket;
        private OperationQueue<ReadOperation> _receiveQueue;
        private OperationQueue<WriteOperation> _sendQueue;
        private SocketAsyncEngine? _asyncEngine;
        private bool IsRegistered => _asyncEngine != null;
        private bool _isHandleNonBlocking = OperatingSystem.IsWasi(); // WASI sockets are always non-blocking, because we don't have another thread which could be blocked
        /// <summary>An index into <see cref="SocketAsyncEngine"/>'s table of all contexts that are currently <see cref="IsRegistered"/>.</summary>
        internal int GlobalContextIndex = -1;

        private readonly object _registerLock = new object();

        public SocketAsyncContext(SafeSocketHandle socket)
        {
            _socket = socket;

            _receiveQueue.Init();
            _sendQueue.Init();
        }

        public bool PreferInlineCompletions
        {
            // Socket.PreferInlineCompletions is an experimental API with internal access modifier.
            // DynamicDependency ensures the setter is available externally using reflection.
            [DynamicDependency("set_PreferInlineCompletions", typeof(Socket))]
            get => _socket.PreferInlineCompletions;
        }

        private bool TryRegister(out Interop.Error error)
        {
            Debug.Assert(_isHandleNonBlocking);
            lock (_registerLock)
            {
                if (_asyncEngine == null)
                {
                    bool addedRef = false;
                    try
                    {
                        _socket.DangerousAddRef(ref addedRef);
                        IntPtr handle = _socket.DangerousGetHandle();
                        if (SocketAsyncEngine.TryRegisterSocket(handle, this, out SocketAsyncEngine? engine, out error))
                        {
                            Volatile.Write(ref _asyncEngine, engine);

                            Trace("Registered");
                            return true;
                        }
                        else
                        {
                            Trace("Registration failed");
                            return false;
                        }
                    }
                    finally
                    {
                        if (addedRef)
                        {
                            _socket.DangerousRelease();
                        }
                    }
                }
                error = Interop.Error.SUCCESS;
                return true;
            }
        }

        public bool StopAndAbort()
        {
            bool aborted = false;

            // Drain queues
            aborted |= _sendQueue.StopAndAbort(this);
            aborted |= _receiveQueue.StopAndAbort(this);

            // We don't need to synchronize with Register.
            // This method is called when the handle gets released.
            // The Register method will throw ODE when it tries to use the handle at this point.
            if (IsRegistered)
            {
                SocketAsyncEngine.UnregisterSocket(this);
            }

            return aborted;
        }

        public void SetHandleNonBlocking()
        {
            if (OperatingSystem.IsWasi())
            {
                // WASI sockets are always non-blocking, because in ST we don't have another thread which could be blocked
                return;
            }
            //
            // Our sockets may start as blocking, and later transition to non-blocking, either because the user
            // explicitly requested non-blocking mode, or because we need non-blocking mode to support async
            // operations.  We never transition back to blocking mode, to avoid problems synchronizing that
            // transition with the async infrastructure.
            //
            // Note that there's no synchronization here, so we may set the non-blocking option multiple times
            // in a race.  This should be fine.
            //
            if (!_isHandleNonBlocking)
            {
                if (Interop.Sys.Fcntl.SetIsNonBlocking(_socket, 1) != 0)
                {
                    throw new SocketException((int)SocketPal.GetSocketErrorForErrorCode(Interop.Sys.GetLastError()));
                }

                _isHandleNonBlocking = true;
            }
        }

        public bool IsHandleNonBlocking => _isHandleNonBlocking;

        private void PerformSyncOperation<TOperation>(ref OperationQueue<TOperation> queue, TOperation operation, int timeout, int observedSequenceNumber)
            where TOperation : AsyncOperation
        {
            if (!Socket.OSSupportsThreads) throw new PlatformNotSupportedException();
            Debug.Assert(timeout == -1 || timeout > 0, $"Unexpected timeout: {timeout}");

            using (var e = new ManualResetEventSlim(false, 0))
            {
                operation.Event = e;

                if (!queue.StartAsyncOperation(this, operation, observedSequenceNumber))
                {
                    // Completed synchronously
                    return;
                }

                bool timeoutExpired = false;
                while (true)
                {
                    DateTime waitStart = DateTime.UtcNow;

                    if (!e.Wait(timeout))
                    {
                        timeoutExpired = true;
                        break;
                    }

                    // Reset the event now to avoid lost notifications if the processing is unsuccessful.
                    e.Reset();

                    // We've been signalled to try to process the operation.
                    OperationResult result = queue.ProcessQueuedOperation(operation);
                    if (result == OperationResult.Completed ||
                        result == OperationResult.Cancelled)
                    {
                        break;
                    }

                    // Couldn't process the operation.
                    // Adjust timeout and try again.
                    if (timeout > 0)
                    {
                        timeout -= (DateTime.UtcNow - waitStart).Milliseconds;

                        if (timeout <= 0)
                        {
                            timeoutExpired = true;
                            break;
                        }
                    }
                }

                if (timeoutExpired)
                {
                    queue.CancelAndContinueProcessing(operation);
                    operation.ErrorCode = SocketError.TimedOut;
                }
            }
        }

        private bool ShouldRetrySyncOperation(out SocketError errorCode)
        {
            if (_isHandleNonBlocking)
            {
                errorCode = SocketError.Success;    // Will be ignored
                return true;
            }

            // We are in blocking mode, so the EAGAIN we received indicates a timeout.
            errorCode = SocketError.TimedOut;
            return false;
        }

        private void ProcessAsyncReadOperation(ReadOperation op) => _receiveQueue.ProcessAsyncOperation(op);

        private void ProcessAsyncWriteOperation(WriteOperation op) => _sendQueue.ProcessAsyncOperation(op);

        public SocketError Accept(Memory<byte> socketAddress, out int socketAddressLen, out IntPtr acceptedFd)
        {
            Debug.Assert(socketAddress.Length > 0, $"Unexpected socketAddressLen: {socketAddress.Length}");

            SocketError errorCode;
            int observedSequenceNumber;
            if (_receiveQueue.IsReady(this, out observedSequenceNumber) &&
                SocketPal.TryCompleteAccept(_socket, socketAddress, out socketAddressLen, out acceptedFd, out errorCode))
            {
                Debug.Assert(errorCode == SocketError.Success || acceptedFd == (IntPtr)(-1), $"Unexpected values: errorCode={errorCode}, acceptedFd={acceptedFd}");
                return errorCode;
            }

            var operation = new AcceptOperation(this)
            {
                SocketAddress = socketAddress,
            };

            PerformSyncOperation(ref _receiveQueue, operation, -1, observedSequenceNumber);

            socketAddressLen = operation.SocketAddress.Length;
            acceptedFd = operation.AcceptedFileDescriptor;
            return operation.ErrorCode;
        }

        public SocketError AcceptAsync(Memory<byte> socketAddress, out int socketAddressLen, out IntPtr acceptedFd, Action<IntPtr, Memory<byte>, SocketError> callback, CancellationToken cancellationToken)
        {
            Debug.Assert(socketAddress.Length > 0, $"Unexpected socketAddressLen: {socketAddress.Length}");
            Debug.Assert(callback != null, "Expected non-null callback");

            SetHandleNonBlocking();

            SocketError errorCode;
            int observedSequenceNumber;
            if (_receiveQueue.IsReady(this, out observedSequenceNumber) &&
                SocketPal.TryCompleteAccept(_socket, socketAddress, out socketAddressLen, out acceptedFd, out errorCode))
            {
                Debug.Assert(errorCode == SocketError.Success || acceptedFd == (IntPtr)(-1), $"Unexpected values: errorCode={errorCode}, acceptedFd={acceptedFd}");

                return errorCode;
            }

            AcceptOperation operation = RentAcceptOperation();
            operation.Callback = callback;
            operation.SocketAddress = socketAddress;

            if (!_receiveQueue.StartAsyncOperation(this, operation, observedSequenceNumber, cancellationToken))
            {
                socketAddressLen = operation.SocketAddress.Length;
                acceptedFd = operation.AcceptedFileDescriptor;
                errorCode = operation.ErrorCode;

                ReturnOperation(operation);
                return errorCode;
            }

            acceptedFd = (IntPtr)(-1);
            socketAddressLen = 0;
            return SocketError.IOPending;
        }

        public SocketError Connect(Memory<byte> socketAddress)
        {
            if (!Socket.OSSupportsThreads) throw new PlatformNotSupportedException();

            Debug.Assert(socketAddress.Length > 0, $"Unexpected socketAddressLen: {socketAddress.Length}");
            // Connect is different than the usual "readiness" pattern of other operations.
            // We need to call TryStartConnect to initiate the connect with the OS,
            // before we try to complete it via epoll notification.
            // Thus, always call TryStartConnect regardless of readiness.
            SocketError errorCode;
            int observedSequenceNumber;
            _sendQueue.IsReady(this, out observedSequenceNumber);
            if (SocketPal.TryStartConnect(_socket, socketAddress, out errorCode) ||
                !ShouldRetrySyncOperation(out errorCode))
            {
                _socket.RegisterConnectResult(errorCode);
                return errorCode;
            }

            var operation = new ConnectOperation(this)
            {
                SocketAddress = socketAddress,
            };

            PerformSyncOperation(ref _sendQueue, operation, -1, observedSequenceNumber);

            return operation.ErrorCode;
        }

        public SocketError ConnectAsync(Memory<byte> socketAddress, Action<int, Memory<byte>, SocketFlags, SocketError> callback, Memory<byte> buffer, out int sentBytes, CancellationToken cancellationToken)
        {
            Debug.Assert(socketAddress.Length > 0, $"Unexpected socketAddressLen: {socketAddress.Length}");
            Debug.Assert(callback != null, "Expected non-null callback");

            SetHandleNonBlocking();

            // Connect is different than the usual "readiness" pattern of other operations.
            // We need to initiate the connect before we try to complete it.
            // Thus, always call TryStartConnect regardless of readiness.
            SocketError errorCode;
            int observedSequenceNumber;
            _sendQueue.IsReady(this, out observedSequenceNumber);
#if SYSTEM_NET_SOCKETS_APPLE_PLATFROM
            if (SocketPal.TryStartConnect(_socket, socketAddress, out errorCode, buffer.Span, _socket.TfoEnabled, out sentBytes))
#else
            if (SocketPal.TryStartConnect(_socket, socketAddress, out errorCode, buffer.Span, false, out sentBytes)) // In Linux, we can figure it out as needed inside PAL.
#endif
            {
                _socket.RegisterConnectResult(errorCode);

                int remains = buffer.Length - sentBytes;

                if (errorCode == SocketError.Success && remains > 0)
                {
                    errorCode = SendToAsync(buffer.Slice(sentBytes), 0, remains, SocketFlags.None, Memory<byte>.Empty, ref sentBytes, callback!, default);
                }
                return errorCode;
            }

            var operation = new ConnectOperation(this)
            {
                Callback = callback,
                SocketAddress = socketAddress,
                Buffer = buffer.Slice(sentBytes),
                BytesTransferred = sentBytes,
            };

            if (!_sendQueue.StartAsyncOperation(this, operation, observedSequenceNumber, cancellationToken))
            {
                if (operation.ErrorCode == SocketError.Success)
                {
                    sentBytes += operation.BytesTransferred;
                }
                return operation.ErrorCode;
            }

            return SocketError.IOPending;
        }

        public SocketError Receive(Memory<byte> buffer, SocketFlags flags, int timeout, out int bytesReceived)
        {
            return ReceiveFrom(buffer, ref flags, Memory<byte>.Empty, out int _, timeout, out bytesReceived);
        }

        public SocketError Receive(Span<byte> buffer, SocketFlags flags, int timeout, out int bytesReceived)
        {
            return ReceiveFrom(buffer, ref flags, Memory<byte>.Empty, out int _, timeout, out bytesReceived);
        }

        public SocketError ReceiveAsync(Memory<byte> buffer, SocketFlags flags, out int bytesReceived, out SocketFlags receivedFlags, Action<int, Memory<byte>, SocketFlags, SocketError> callback, CancellationToken cancellationToken)
        {
            return ReceiveFromAsync(buffer, flags, Memory<byte>.Empty, out int _, out bytesReceived, out receivedFlags, callback, cancellationToken);
        }

        public SocketError ReceiveFrom(Memory<byte> buffer, ref SocketFlags flags, Memory<byte> socketAddress, out int socketAddressLen, int timeout, out int bytesReceived)
        {
            if (!Socket.OSSupportsThreads) throw new PlatformNotSupportedException();

            Debug.Assert(timeout == -1 || timeout > 0, $"Unexpected timeout: {timeout}");

            SocketFlags receivedFlags;
            SocketError errorCode;
            int observedSequenceNumber;
            if (_receiveQueue.IsReady(this, out observedSequenceNumber) &&
                (SocketPal.TryCompleteReceiveFrom(_socket, buffer.Span, flags, socketAddress.Span, out socketAddressLen, out bytesReceived, out receivedFlags, out errorCode) ||
                !ShouldRetrySyncOperation(out errorCode)))
            {
                flags = receivedFlags;
                return errorCode;
            }

            var operation = new BufferMemoryReceiveOperation(this)
            {
                Buffer = buffer,
                Flags = flags,
                SetReceivedFlags = true,
                SocketAddress = socketAddress,
            };

            PerformSyncOperation(ref _receiveQueue, operation, timeout, observedSequenceNumber);

            flags = operation.ReceivedFlags;
            bytesReceived = operation.BytesTransferred;
            socketAddressLen = operation.SocketAddress.Length;
            return operation.ErrorCode;
        }

        public unsafe SocketError ReceiveFrom(Span<byte> buffer, ref SocketFlags flags, Memory<byte> socketAddress, out int socketAddressLen, int timeout, out int bytesReceived)
        {
            if (!Socket.OSSupportsThreads) throw new PlatformNotSupportedException();

            SocketFlags receivedFlags;
            SocketError errorCode;
            int observedSequenceNumber;
            if (_receiveQueue.IsReady(this, out observedSequenceNumber) &&
                (SocketPal.TryCompleteReceiveFrom(_socket, buffer, flags, socketAddress.Span, out socketAddressLen, out bytesReceived, out receivedFlags, out errorCode) ||
                !ShouldRetrySyncOperation(out errorCode)))
            {
                flags = receivedFlags;
                return errorCode;
            }

            fixed (byte* bufferPtr = &MemoryMarshal.GetReference(buffer))
            {
                var operation = new BufferPtrReceiveOperation(this)
                {
                    BufferPtr = bufferPtr,
                    Length = buffer.Length,
                    Flags = flags,
                    SocketAddress = socketAddress,
                };

                PerformSyncOperation(ref _receiveQueue, operation, timeout, observedSequenceNumber);

                flags = operation.ReceivedFlags;
                bytesReceived = operation.BytesTransferred;
                socketAddressLen = operation.SocketAddress.Length;
                return operation.ErrorCode;
            }
        }

        public SocketError ReceiveAsync(Memory<byte> buffer, SocketFlags flags, out int bytesReceived, Action<int, Memory<byte>, SocketFlags, SocketError> callback, CancellationToken cancellationToken = default)
        {
            SetHandleNonBlocking();

            SocketError errorCode;
            int observedSequenceNumber;
            if (_receiveQueue.IsReady(this, out observedSequenceNumber) &&
                SocketPal.TryCompleteReceive(_socket, buffer.Span, flags, out bytesReceived, out errorCode))
            {
                return errorCode;
            }

            BufferMemoryReceiveOperation operation = RentBufferMemoryReceiveOperation();
            operation.SetReceivedFlags = false;
            operation.Callback = callback;
            operation.Buffer = buffer;
            operation.Flags = flags;
            operation.SocketAddress = default;

            if (!_receiveQueue.StartAsyncOperation(this, operation, observedSequenceNumber, cancellationToken))
            {
                bytesReceived = operation.BytesTransferred;
                errorCode = operation.ErrorCode;

                ReturnOperation(operation);
                return errorCode;
            }

            bytesReceived = 0;
            return SocketError.IOPending;
        }

        public SocketError ReceiveFromAsync(Memory<byte> buffer, SocketFlags flags, Memory<byte> socketAddress, out int socketAddressLen, out int bytesReceived, out SocketFlags receivedFlags, Action<int, Memory<byte>, SocketFlags, SocketError> callback, CancellationToken cancellationToken = default)
        {
            SetHandleNonBlocking();

            SocketError errorCode;
            int observedSequenceNumber;
            if (_receiveQueue.IsReady(this, out observedSequenceNumber) &&
                SocketPal.TryCompleteReceiveFrom(_socket, buffer.Span, flags, socketAddress.Span, out socketAddressLen, out bytesReceived, out receivedFlags, out errorCode))
            {
                return errorCode;
            }

            BufferMemoryReceiveOperation operation = RentBufferMemoryReceiveOperation();
            operation.SetReceivedFlags = true;
            operation.Callback = callback;
            operation.Buffer = buffer;
            operation.Flags = flags;
            operation.SocketAddress = socketAddress;

            if (!_receiveQueue.StartAsyncOperation(this, operation, observedSequenceNumber, cancellationToken))
            {
                receivedFlags = operation.ReceivedFlags;
                bytesReceived = operation.BytesTransferred;
                errorCode = operation.ErrorCode;
                socketAddressLen = operation.SocketAddress.Length;

                ReturnOperation(operation);
                return errorCode;
            }

            bytesReceived = 0;
            socketAddressLen = 0;
            receivedFlags = SocketFlags.None;
            return SocketError.IOPending;
        }

        public SocketError Receive(IList<ArraySegment<byte>> buffers, SocketFlags flags, int timeout, out int bytesReceived)
        {
            return ReceiveFrom(buffers, ref flags, Memory<byte>.Empty, out int _, timeout, out bytesReceived);
        }

        public SocketError ReceiveAsync(IList<ArraySegment<byte>> buffers, SocketFlags flags, out int bytesReceived, out SocketFlags receivedFlags, Action<int, Memory<byte>, SocketFlags, SocketError> callback)
        {
            return ReceiveFromAsync(buffers, flags, Memory<byte>.Empty, out int _, out bytesReceived, out receivedFlags, callback);
        }

        public SocketError ReceiveFrom(IList<ArraySegment<byte>> buffers, ref SocketFlags flags, Memory<byte> socketAddress, out int socketAddressLen, int timeout, out int bytesReceived)
        {
            if (!Socket.OSSupportsThreads) throw new PlatformNotSupportedException();

            Debug.Assert(timeout == -1 || timeout > 0, $"Unexpected timeout: {timeout}");

            SocketFlags receivedFlags;
            SocketError errorCode;
            int observedSequenceNumber;
            if (_receiveQueue.IsReady(this, out observedSequenceNumber) &&
                (SocketPal.TryCompleteReceiveFrom(_socket, buffers, flags, socketAddress.Span, out socketAddressLen, out bytesReceived, out receivedFlags, out errorCode) ||
                !ShouldRetrySyncOperation(out errorCode)))
            {
                flags = receivedFlags;
                return errorCode;
            }

            var operation = new BufferListReceiveOperation(this)
            {
                Buffers = buffers,
                Flags = flags,
                SocketAddress = socketAddress,
            };

            PerformSyncOperation(ref _receiveQueue, operation, timeout, observedSequenceNumber);

            socketAddressLen = operation.SocketAddress.Length;
            flags = operation.ReceivedFlags;
            bytesReceived = operation.BytesTransferred;
            return operation.ErrorCode;
        }

        public SocketError ReceiveFromAsync(IList<ArraySegment<byte>> buffers, SocketFlags flags, Memory<byte> socketAddress, out int socketAddressLen, out int bytesReceived, out SocketFlags receivedFlags, Action<int, Memory<byte>, SocketFlags, SocketError> callback)
        {
            SetHandleNonBlocking();

            SocketError errorCode;
            int observedSequenceNumber;
            if (_receiveQueue.IsReady(this, out observedSequenceNumber) &&
                SocketPal.TryCompleteReceiveFrom(_socket, buffers, flags, socketAddress.Span, out socketAddressLen, out bytesReceived, out receivedFlags, out errorCode))
            {
                // Synchronous success or failure
                return errorCode;
            }

            BufferListReceiveOperation operation = RentBufferListReceiveOperation();
            operation.Callback = callback;
            operation.Buffers = buffers;
            operation.Flags = flags;
            operation.SocketAddress = socketAddress;

            if (!_receiveQueue.StartAsyncOperation(this, operation, observedSequenceNumber))
            {
                socketAddressLen = operation.SocketAddress.Length;
                receivedFlags = operation.ReceivedFlags;
                bytesReceived = operation.BytesTransferred;
                errorCode = operation.ErrorCode;

                ReturnOperation(operation);
                return errorCode;
            }

            receivedFlags = SocketFlags.None;
            socketAddressLen = 0;
            bytesReceived = 0;
            return SocketError.IOPending;
        }

        public SocketError ReceiveMessageFrom(
            Memory<byte> buffer, ref SocketFlags flags, Memory<byte> socketAddress, out int socketAddressLen, bool isIPv4, bool isIPv6, int timeout, out IPPacketInformation ipPacketInformation, out int bytesReceived)
        {
            if (!Socket.OSSupportsThreads) throw new PlatformNotSupportedException();

            Debug.Assert(timeout == -1 || timeout > 0, $"Unexpected timeout: {timeout}");

            SocketFlags receivedFlags;
            SocketError errorCode;
            int observedSequenceNumber;
            if (_receiveQueue.IsReady(this, out observedSequenceNumber) &&
                (SocketPal.TryCompleteReceiveMessageFrom(_socket, buffer.Span, null, flags, socketAddress, out socketAddressLen, isIPv4, isIPv6, out bytesReceived, out receivedFlags, out ipPacketInformation, out errorCode) ||
                !ShouldRetrySyncOperation(out errorCode)))
            {
                flags = receivedFlags;
                return errorCode;
            }

            var operation = new ReceiveMessageFromOperation(this)
            {
                Buffer = buffer,
                Buffers = null,
                Flags = flags,
                SocketAddress = socketAddress,
                IsIPv4 = isIPv4,
                IsIPv6 = isIPv6,
            };

            PerformSyncOperation(ref _receiveQueue, operation, timeout, observedSequenceNumber);

            socketAddressLen = operation.SocketAddress.Length;
            flags = operation.ReceivedFlags;
            ipPacketInformation = operation.IPPacketInformation;
            bytesReceived = operation.BytesTransferred;
            return operation.ErrorCode;
        }

        public unsafe SocketError ReceiveMessageFrom(
            Span<byte> buffer, ref SocketFlags flags, Memory<byte> socketAddress, out int socketAddressLen, bool isIPv4, bool isIPv6, int timeout, out IPPacketInformation ipPacketInformation, out int bytesReceived)
        {
            if (!Socket.OSSupportsThreads) throw new PlatformNotSupportedException();

            Debug.Assert(timeout == -1 || timeout > 0, $"Unexpected timeout: {timeout}");

            SocketFlags receivedFlags;
            SocketError errorCode;
            int observedSequenceNumber;
            if (_receiveQueue.IsReady(this, out observedSequenceNumber) &&
                (SocketPal.TryCompleteReceiveMessageFrom(_socket, buffer, null, flags, socketAddress, out socketAddressLen, isIPv4, isIPv6, out bytesReceived, out receivedFlags, out ipPacketInformation, out errorCode) ||
                !ShouldRetrySyncOperation(out errorCode)))
            {
                flags = receivedFlags;
                return errorCode;
            }

            fixed (byte* bufferPtr = &MemoryMarshal.GetReference(buffer))
            {
                var operation = new BufferPtrReceiveMessageFromOperation(this)
                {
                    BufferPtr = bufferPtr,
                    Length = buffer.Length,
                    Flags = flags,
                    SocketAddress = socketAddress,
                    IsIPv4 = isIPv4,
                    IsIPv6 = isIPv6,
                };

                PerformSyncOperation(ref _receiveQueue, operation, timeout, observedSequenceNumber);

                socketAddressLen = operation.SocketAddress.Length;
                flags = operation.ReceivedFlags;
                ipPacketInformation = operation.IPPacketInformation;
                bytesReceived = operation.BytesTransferred;
                return operation.ErrorCode;
            }
        }

        public SocketError ReceiveMessageFromAsync(Memory<byte> buffer, IList<ArraySegment<byte>>? buffers, SocketFlags flags, Memory<byte> socketAddress, out int socketAddressLen, bool isIPv4, bool isIPv6, out int bytesReceived, out SocketFlags receivedFlags, out IPPacketInformation ipPacketInformation, Action<int, Memory<byte>, SocketFlags, IPPacketInformation, SocketError> callback, CancellationToken cancellationToken = default)
        {
            SetHandleNonBlocking();

            SocketError errorCode;
            int observedSequenceNumber;
            if (_receiveQueue.IsReady(this, out observedSequenceNumber) &&
                SocketPal.TryCompleteReceiveMessageFrom(_socket, buffer.Span, buffers, flags, socketAddress, out socketAddressLen, isIPv4, isIPv6, out bytesReceived, out receivedFlags, out ipPacketInformation, out errorCode))
            {
                return errorCode;
            }

            var operation = new ReceiveMessageFromOperation(this)
            {
                Callback = callback,
                Buffer = buffer,
                Buffers = buffers,
                Flags = flags,
                SocketAddress = socketAddress,
                IsIPv4 = isIPv4,
                IsIPv6 = isIPv6,
            };

            if (!_receiveQueue.StartAsyncOperation(this, operation, observedSequenceNumber, cancellationToken))
            {
                socketAddressLen = operation.SocketAddress.Length;
                receivedFlags = operation.ReceivedFlags;
                ipPacketInformation = operation.IPPacketInformation;
                bytesReceived = operation.BytesTransferred;
                return operation.ErrorCode;
            }

            ipPacketInformation = default(IPPacketInformation);
            bytesReceived = 0;
            socketAddressLen = 0;
            receivedFlags = SocketFlags.None;
            return SocketError.IOPending;
        }

        public SocketError Send(ReadOnlySpan<byte> buffer, SocketFlags flags, int timeout, out int bytesSent) =>
            SendTo(buffer, flags, Memory<byte>.Empty, timeout, out bytesSent);

        public SocketError Send(byte[] buffer, int offset, int count, SocketFlags flags, int timeout, out int bytesSent)
        {
            return SendTo(buffer, offset, count, flags, Memory<byte>.Empty, timeout, out bytesSent);
        }

        public SocketError SendAsync(Memory<byte> buffer, int offset, int count, SocketFlags flags, out int bytesSent, Action<int, Memory<byte>, SocketFlags, SocketError> callback, CancellationToken cancellationToken)
        {
            bytesSent = 0;
            return SendToAsync(buffer, offset, count, flags, Memory<byte>.Empty, ref bytesSent, callback, cancellationToken);
        }

        public SocketError SendTo(byte[] buffer, int offset, int count, SocketFlags flags, Memory<byte> socketAddress, int timeout, out int bytesSent)
        {
            if (!Socket.OSSupportsThreads) throw new PlatformNotSupportedException();

            Debug.Assert(timeout == -1 || timeout > 0, $"Unexpected timeout: {timeout}");

            bytesSent = 0;
            SocketError errorCode;
            int observedSequenceNumber;
            if (_sendQueue.IsReady(this, out observedSequenceNumber) &&
                (SocketPal.TryCompleteSendTo(_socket, buffer, ref offset, ref count, flags, socketAddress.Span, ref bytesSent, out errorCode) ||
                !ShouldRetrySyncOperation(out errorCode)))
            {
                return errorCode;
            }

            var operation = new BufferMemorySendOperation(this)
            {
                Buffer = buffer,
                Offset = offset,
                Count = count,
                Flags = flags,
                SocketAddress = socketAddress,
                BytesTransferred = bytesSent
            };

            PerformSyncOperation(ref _sendQueue, operation, timeout, observedSequenceNumber);

            bytesSent = operation.BytesTransferred;
            return operation.ErrorCode;
        }

        public unsafe SocketError SendTo(ReadOnlySpan<byte> buffer, SocketFlags flags, Memory<byte> socketAddress, int timeout, out int bytesSent)
        {
            if (!Socket.OSSupportsThreads) throw new PlatformNotSupportedException();

            Debug.Assert(timeout == -1 || timeout > 0, $"Unexpected timeout: {timeout}");

            bytesSent = 0;
            SocketError errorCode;
            int bufferIndexIgnored = 0, offset = 0, count = buffer.Length;
            int observedSequenceNumber;
            if (_sendQueue.IsReady(this, out observedSequenceNumber) &&
                (SocketPal.TryCompleteSendTo(_socket, buffer, null, ref bufferIndexIgnored, ref offset, ref count, flags, socketAddress.Span, ref bytesSent, out errorCode) ||
                !ShouldRetrySyncOperation(out errorCode)))
            {
                return errorCode;
            }

            fixed (byte* bufferPtr = &MemoryMarshal.GetReference(buffer))
            {
                var operation = new BufferPtrSendOperation(this)
                {
                    BufferPtr = bufferPtr,
                    Offset = offset,
                    Count = count,
                    Flags = flags,
                    SocketAddress = socketAddress,
                    BytesTransferred = bytesSent
                };

                PerformSyncOperation(ref _sendQueue, operation, timeout, observedSequenceNumber);

                bytesSent = operation.BytesTransferred;
                return operation.ErrorCode;
            }
        }

        public SocketError SendToAsync(Memory<byte> buffer, int offset, int count, SocketFlags flags, Memory<byte> socketAddress, ref int bytesSent, Action<int, Memory<byte>, SocketFlags, SocketError> callback, CancellationToken cancellationToken = default)
        {
            SetHandleNonBlocking();

            SocketError errorCode;
            int observedSequenceNumber;
            if (_sendQueue.IsReady(this, out observedSequenceNumber) &&
                SocketPal.TryCompleteSendTo(_socket, buffer.Span, ref offset, ref count, flags, socketAddress.Span, ref bytesSent, out errorCode))
            {
                return errorCode;
            }

            BufferMemorySendOperation operation = RentBufferMemorySendOperation();
            operation.Callback = callback;
            operation.Buffer = buffer;
            operation.Offset = offset;
            operation.Count = count;
            operation.Flags = flags;
            operation.SocketAddress = socketAddress;
            operation.BytesTransferred = bytesSent;

            if (!_sendQueue.StartAsyncOperation(this, operation, observedSequenceNumber, cancellationToken))
            {
                bytesSent = operation.BytesTransferred;
                errorCode = operation.ErrorCode;

                ReturnOperation(operation);
                return errorCode;
            }

            return SocketError.IOPending;
        }

        public SocketError Send(IList<ArraySegment<byte>> buffers, SocketFlags flags, int timeout, out int bytesSent)
        {
            return SendTo(buffers, flags, Memory<byte>.Empty, timeout, out bytesSent);
        }

        public SocketError SendAsync(IList<ArraySegment<byte>> buffers, SocketFlags flags, out int bytesSent, Action<int, Memory<byte>, SocketFlags, SocketError> callback)
        {
            return SendToAsync(buffers, flags, Memory<byte>.Empty, out bytesSent, callback);
        }

        public SocketError SendTo(IList<ArraySegment<byte>> buffers, SocketFlags flags, Memory<byte> socketAddress, int timeout, out int bytesSent)
        {
            if (!Socket.OSSupportsThreads) throw new PlatformNotSupportedException();

            Debug.Assert(timeout == -1 || timeout > 0, $"Unexpected timeout: {timeout}");

            bytesSent = 0;
            int bufferIndex = 0;
            int offset = 0;
            SocketError errorCode;
            int observedSequenceNumber;
            if (_sendQueue.IsReady(this, out observedSequenceNumber) &&
                (SocketPal.TryCompleteSendTo(_socket, buffers, ref bufferIndex, ref offset, flags, socketAddress.Span, ref bytesSent, out errorCode) ||
                !ShouldRetrySyncOperation(out errorCode)))
            {
                return errorCode;
            }

            var operation = new BufferListSendOperation(this)
            {
                Buffers = buffers,
                BufferIndex = bufferIndex,
                Offset = offset,
                Flags = flags,
                SocketAddress = socketAddress,
                BytesTransferred = bytesSent
            };

            PerformSyncOperation(ref _sendQueue, operation, timeout, observedSequenceNumber);

            bytesSent = operation.BytesTransferred;
            return operation.ErrorCode;
        }

        public SocketError SendToAsync(IList<ArraySegment<byte>> buffers, SocketFlags flags, Memory<byte> socketAddress, out int bytesSent, Action<int, Memory<byte>, SocketFlags, SocketError> callback)
        {
            SetHandleNonBlocking();

            bytesSent = 0;
            int bufferIndex = 0;
            int offset = 0;
            SocketError errorCode;
            int observedSequenceNumber;
            if (_sendQueue.IsReady(this, out observedSequenceNumber) &&
                SocketPal.TryCompleteSendTo(_socket, buffers, ref bufferIndex, ref offset, flags, socketAddress.Span, ref bytesSent, out errorCode))
            {
                return errorCode;
            }

            BufferListSendOperation operation = RentBufferListSendOperation();
            operation.Callback = callback;
            operation.Buffers = buffers;
            operation.BufferIndex = bufferIndex;
            operation.Offset = offset;
            operation.Flags = flags;
            operation.SocketAddress = socketAddress;
            operation.BytesTransferred = bytesSent;

            if (!_sendQueue.StartAsyncOperation(this, operation, observedSequenceNumber))
            {
                bytesSent = operation.BytesTransferred;
                errorCode = operation.ErrorCode;

                ReturnOperation(operation);
                return errorCode;
            }

            return SocketError.IOPending;
        }

        public SocketError SendFile(SafeFileHandle fileHandle, long offset, long count, int timeout, out long bytesSent)
        {
            if (!Socket.OSSupportsThreads) throw new PlatformNotSupportedException();

            Debug.Assert(timeout == -1 || timeout > 0, $"Unexpected timeout: {timeout}");

            bytesSent = 0;
            SocketError errorCode;
            int observedSequenceNumber;
            if (_sendQueue.IsReady(this, out observedSequenceNumber) &&
                (SocketPal.TryCompleteSendFile(_socket, fileHandle, ref offset, ref count, ref bytesSent, out errorCode) ||
                !ShouldRetrySyncOperation(out errorCode)))
            {
                return errorCode;
            }

            var operation = new SendFileOperation(this)
            {
                FileHandle = fileHandle,
                Offset = offset,
                Count = count,
                BytesTransferred = bytesSent
            };

            PerformSyncOperation(ref _sendQueue, operation, timeout, observedSequenceNumber);

            bytesSent = operation.BytesTransferred;
            return operation.ErrorCode;
        }

        public SocketError SendFileAsync(SafeFileHandle fileHandle, long offset, long count, out long bytesSent, Action<long, SocketError> callback, CancellationToken cancellationToken = default)
        {
            SetHandleNonBlocking();

            bytesSent = 0;
            SocketError errorCode;
            int observedSequenceNumber;
            if (_sendQueue.IsReady(this, out observedSequenceNumber) &&
                SocketPal.TryCompleteSendFile(_socket, fileHandle, ref offset, ref count, ref bytesSent, out errorCode))
            {
                return errorCode;
            }

            var operation = new SendFileOperation(this)
            {
                Callback = callback,
                FileHandle = fileHandle,
                Offset = offset,
                Count = count,
                BytesTransferred = bytesSent
            };

            if (!_sendQueue.StartAsyncOperation(this, operation, observedSequenceNumber, cancellationToken))
            {
                bytesSent = operation.BytesTransferred;
                return operation.ErrorCode;
            }

            return SocketError.IOPending;
        }

        // Called on the epoll thread, speculatively tries to process synchronous events and errors for synchronous events, and
        // returns any remaining events that remain to be processed. Taking a lock for each operation queue to deterministically
        // handle synchronous events on the epoll thread seems to significantly reduce throughput in benchmarks. On the other
        // hand, the speculative checks make it nondeterministic, where it would be possible for the epoll thread to think that
        // the next operation in a queue is not synchronous when it is (due to a race, old caches, etc.) and cause the event to
        // be scheduled instead. It's not functionally incorrect to schedule the release of a synchronous operation, just it may
        // lead to thread pool starvation issues if the synchronous operations are blocking thread pool threads (typically not
        // advised) and more threads are not immediately available to run work items that would release those operations.
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public Interop.Sys.SocketEvents HandleSyncEventsSpeculatively(Interop.Sys.SocketEvents events)
        {
            if ((events & Interop.Sys.SocketEvents.Error) != 0)
            {
                // Set the Read and Write flags; the processing for these events
                // will pick up the error.
                events ^= Interop.Sys.SocketEvents.Error;
                events |= Interop.Sys.SocketEvents.Read | Interop.Sys.SocketEvents.Write;
            }

            if ((events & Interop.Sys.SocketEvents.Read) != 0 &&
                _receiveQueue.IsNextOperationSynchronous_Speculative &&
                _receiveQueue.ProcessSyncEventOrGetAsyncEvent(this, skipAsyncEvents: true) == null)
            {
                events ^= Interop.Sys.SocketEvents.Read;
            }

            if ((events & Interop.Sys.SocketEvents.Write) != 0 &&
                _sendQueue.IsNextOperationSynchronous_Speculative &&
                _sendQueue.ProcessSyncEventOrGetAsyncEvent(this, skipAsyncEvents: true) == null)
            {
                events ^= Interop.Sys.SocketEvents.Write;
            }

            return events;
        }

        // Called on the epoll thread.
        public void HandleEventsInline(Interop.Sys.SocketEvents events)
        {
            if ((events & Interop.Sys.SocketEvents.Error) != 0)
            {
                // Set the Read and Write flags; the processing for these events
                // will pick up the error.
                events ^= Interop.Sys.SocketEvents.Error;
                events |= Interop.Sys.SocketEvents.Read | Interop.Sys.SocketEvents.Write;
            }

            if ((events & Interop.Sys.SocketEvents.Read) != 0)
            {
                AsyncOperation? receiveOperation = _receiveQueue.ProcessSyncEventOrGetAsyncEvent(this);
                receiveOperation?.Process();
            }

            if ((events & Interop.Sys.SocketEvents.Write) != 0)
            {
                AsyncOperation? sendOperation = _sendQueue.ProcessSyncEventOrGetAsyncEvent(this);
                sendOperation?.Process();
            }
        }

        // Called on ThreadPool thread.
        public void HandleEvents(Interop.Sys.SocketEvents events)
        {
            Debug.Assert((events & Interop.Sys.SocketEvents.Error) == 0);

            AsyncOperation? receiveOperation =
                (events & Interop.Sys.SocketEvents.Read) != 0 ? _receiveQueue.ProcessSyncEventOrGetAsyncEvent(this) : null;
            AsyncOperation? sendOperation =
                (events & Interop.Sys.SocketEvents.Write) != 0 ? _sendQueue.ProcessSyncEventOrGetAsyncEvent(this) : null;

            // This method is called from a thread pool thread. When we have only one operation to process, process it
            // synchronously to avoid an extra thread pool work item. When we have two operations to process, processing both
            // synchronously may delay the second operation, so schedule one onto the thread pool and process the other
            // synchronously. There might be better ways of doing this.
            if (sendOperation == null)
            {
                receiveOperation?.Process();
            }
            else
            {
                receiveOperation?.Schedule();
                sendOperation.Process();
            }
        }

        //
        // Tracing stuff
        //

        // To enabled tracing:
        // (1) Add reference to System.Console in the csproj
        // (2) #define SOCKETASYNCCONTEXT_TRACE

        [Conditional("SOCKETASYNCCONTEXT_TRACE")]
        public void Trace(string message, [CallerMemberName] string? memberName = null)
        {
            OutputTrace($"{IdOf(this)}.{memberName}: {message}");
        }

        [Conditional("SOCKETASYNCCONTEXT_TRACE")]
        public static void OutputTrace(string s)
        {
            // CONSIDER: Change to NetEventSource
#if SOCKETASYNCCONTEXT_TRACE
            Console.WriteLine(s);
#endif
        }

        public static string IdOf(object o) => o == null ? "(null)" : $"{o.GetType().Name}#{o.GetHashCode():X2}";
    }
}
